原文:Hello_World
状态:待校对
翻译:Bingjian-Zhu
校对:

CC-BY-SA

前提条件

本教程假设RabbitMQ已经安装在你本机的 (5672)端口。如果你使用了不同的主机、端口或者凭证,连接设置就需要作出一些对应的调整。

介绍

RabbitMQ是一个消息代理。它的工作就是接收和转发消息。你可以把它想像成一个邮局:你把信件放入邮箱,邮递员就会把信件投递到你的收件人处。在这个比喻中,RabbitMQ就扮演着邮箱、邮局以及邮递员的角色。

RabbitMQ和邮局的主要区别在于,它处理纸张,而是接收、存储和发送消息(message)这种二进制数据。

下面是RabbitMQ和消息所涉及到的一些术语。

  • 生产(Producing)的意思就是发送。发送消息的程序就是一个生产者(producer)。我们一般用”P”来表示: img
  • 队列(queue)就是存在于RabbitMQ中邮箱的名称。虽然消息的传输经过了RabbitMQ和你的应用程序,但是它只能被存储于队列当中。实质上队列就是个巨大的消息缓冲区,它的大小只受主机内存和硬盘限制。多个生产者(producers)可以把消息发送给同一个队列,同样,多个消费者(consumers)也能够从同一个队列(queue)中获取数据。队列可以绘制成这样(图上是队列的名称): img
  • 在这里,消费(Consuming)和接收(receiving)是同一个意思。一个消费者(consumer)就是一个等待获取消息的程序。我们把它绘制为”C”: img

Hello World!

(使用Go客户端)

在本教程的这一部分中,我们将在Go中编写两个小程序:发送单个消息的生产者,以及接收消息并将其打印出来的消费者。我们将忽略Go RabbitMQ API中的一些细节,这里传递“Hello World”消息。 在下图中,“P”是生产者,“C”是消费者。中间的框是一个队列(保存消息的地方)。 (P) -> [|||] -> (C)

Go RabbitMQ客户端库 RabbitMQ使用的是AMQP 0.9.1协议。这是一个用于消息传递的开放、通用的协议。针对不同编程语言有大量的RabbitMQ客户端可用。在本教程中,我们将使用Go amqp客户端。

首先,使用go get安装amqp:go get github.com/streadway/amqp

发送

(P) -> [|||]

我们将编写消息生产者(发送者)send.go和我们的消息消费者(接收者)receive.go。 发布者将连接到RabbitMQ,发送单个消息,然后退出。 在send.go中,我们需要先导入包:

  1. package main
  2. import (
  3. "log"
  4. "github.com/streadway/amqp"
  5. )

我们还需要一个辅助函数来检查每个amqp调用的返回值

  1. func failOnError(err error, msg string) {
  2. if err != nil {
  3. log.Fatalf("%s: %s", msg, err)
  4. }
  5. }

然后连接到RabbitMQ服务器

  1. conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
  2. failOnError(err, "Failed to connect to RabbitMQ")
  3. defer conn.Close()

配置连接套接字,它主要定义连接的协议和身份验证等。接下来,我们创建一个channel来传递消息:

  1. ch, err := conn.Channel()
  2. failOnError(err, "Failed to open a channel")
  3. defer ch.Close()

发送前,我们必须声明一个队列供我们发送,然后才能向队列发送消息:

  1. q, err := ch.QueueDeclare(
  2. "hello", // name
  3. false, // durable
  4. false, // delete when unused
  5. false, // exclusive
  6. false, // no-wait
  7. nil, // arguments
  8. )
  9. failOnError(err, "Failed to declare a queue")
  10. body := "Hello World!"
  11. err = ch.Publish(
  12. "", // exchange
  13. q.Name, // routing key
  14. false, // mandatory
  15. false, // immediate
  16. amqp.Publishing {
  17. ContentType: "text/plain",
  18. Body: []byte(body),
  19. })
  20. failOnError(err, "Failed to publish a message")

声明队列是幂等的——只有在它不存在的情况下才会创建它。消息内容是一个字节数组,因此你可以编写任何内容。 [|||] -> (C)

接收

以上是消息生产者。我们的消费者需要监听来自RabbitMQ的消息,因此与生产者不同,它需要持续运行以监听消息并将其打印出来 前提条件 - 图8 代码receive.go具有与send相同的导入包和辅助函数:

  1. package main
  2. import (
  3. "log"
  4. "github.com/streadway/amqp"
  5. )
  6. func failOnError(err error, msg string) {
  7. if err != nil {
  8. log.Fatalf("%s: %s", msg, err)
  9. }
  10. }

设置与生产者相同,首先打开一个连接和一个Channel,并声明我们要消费的队列。请注意,这与发送的队列相匹配

  1. conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
  2. failOnError(err, "Failed to connect to RabbitMQ")
  3. defer conn.Close()
  4. ch, err := conn.Channel()
  5. failOnError(err, "Failed to open a channel")
  6. defer ch.Close()
  7. q, err := ch.QueueDeclare(
  8. "hello", // name
  9. false, // durable
  10. false, // delete when usused
  11. false, // exclusive
  12. false, // no-wait
  13. nil, // arguments
  14. )
  15. failOnError(err, "Failed to declare a queue")

注意,在这里也做了队列声明。因为消费者可能在生产者启动前就运行了,所以要确保使用消息之前队列已经存在。 我们即将告诉服务器从队列中传递消息。因为它会异步地向我们发送消息,所以我们将在goroutine中读取来自channel (由amqp :: Consume返回)的消息。

  1. msgs, err := ch.Consume(
  2. q.Name, // queue
  3. "", // consumer
  4. true, // auto-ack
  5. false, // exclusive
  6. false, // no-local
  7. false, // no-wait
  8. nil, // args
  9. )
  10. failOnError(err, "Failed to register a consumer")
  11. forever := make(chan bool)
  12. go func() {
  13. for d := range msgs {
  14. log.Printf("Received a message: %s", d.Body)
  15. }
  16. }()
  17. log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
  18. <-forever

receive.go

代码整合

运行生产者:go run send.go 运行消费者:go run receive.go 消费者将通过RabbitMQ打印从生产者处获得的消息。消费者将继续运行,等待消息(使用Ctrl-C停止消息)。可以尝试从另一个终端再次运行生产者来发送消息。

我们已经学会如何发送消息到一个已知队列中并接收消息。是时候移步到第二部分了,我们将会建立一个简单的工作队列(work queue)。